package com.qingyunge.util;

import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import com.qingyunge.common.FlinkPhConfig;
import org.apache.commons.lang3.StringUtils;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Set;

public class PhoenixUtil {
    public static void upsertValues(DruidPooledConnection connection, String sinkTable, JSONObject data) throws SQLException {
        // 1 拼接sql
        Set<String> columns = data.keySet();
        Collection<Object> values = data.values();
        String sql = "upsert into " + FlinkPhConfig.HBASE_SCHEMA + "." +
                sinkTable + "(" + StringUtils.join(columns, ",") + ") values ('" +
                StringUtils.join(values, "','") + "')";
        // 2 预编译sql
        PreparedStatement preparedStatement = connection.prepareStatement(sql);
        // 3 执行sql
        preparedStatement.execute();
        connection.commit();
        // 4 close
        preparedStatement.close();
    }
}